---
title: "Python 异步编程"
description: "从 JavaScript 开发者视角学习 Python 的异步编程、事件循环机制和异步 Web 开发"
---

## 1. 引言

### 为什么需要异步编程？

作为 JavaScript 开发者，你已经熟悉了 Promise、async/await 和事件循环的概念。Python 也有自己的异步编程模型，虽然语法相似，但底层机制有所不同。

**异步编程的核心价值**
- 提高程序性能：避免阻塞 I/O 操作
- 更好的资源利用：单线程处理多个并发任务
- 响应性：保持程序的响应能力
- 可扩展性：支持大量并发连接

> 💡 **学习策略**：将 Python 的异步编程视为 JavaScript 异步模型的"Python 版本"

## 2. 同步 vs 异步

### 2.1 基本概念对比

<PythonEditor title="同步 vs 异步对比" compare={true}>
```javascript !! js
// JavaScript 同步操作
function syncOperation() {
    console.log("开始同步操作");
    // 模拟耗时操作
    const start = Date.now();
    while (Date.now() - start < 2000) {
        // 阻塞 2 秒
    }
    console.log("同步操作完成");
    return "同步结果";
}

// JavaScript 异步操作
async function asyncOperation() {
    console.log("开始异步操作");
    // 模拟异步操作
    await new Promise(resolve => setTimeout(resolve, 2000));
    console.log("异步操作完成");
    return "异步结果";
}

// 同步调用（会阻塞）
console.log("调用同步函数");
const syncResult = syncOperation();
console.log("同步结果:", syncResult);

// 异步调用（不会阻塞）
console.log("调用异步函数");
asyncOperation().then(result => {
    console.log("异步结果:", result);
});
console.log("异步调用后立即执行");
```

```python !! py
import asyncio
import time

# Python 同步操作
def sync_operation():
    """同步操作"""
    print("开始同步操作")
    # 模拟耗时操作
    time.sleep(2)  # 阻塞 2 秒
    print("同步操作完成")
    return "同步结果"

# Python 异步操作
async def async_operation():
    """异步操作"""
    print("开始异步操作")
    # 模拟异步操作
    await asyncio.sleep(2)  # 非阻塞等待
    print("异步操作完成")
    return "异步结果"

# 同步调用（会阻塞）
print("调用同步函数")
sync_result = sync_operation()
print("同步结果:", sync_result)

# 异步调用（不会阻塞）
async def main():
    print("调用异步函数")
    result = await async_operation()
    print("异步结果:", result)

# 运行异步函数
print("异步调用后立即执行")
asyncio.create_task(main())
```
</PythonEditor>

### 2.2 事件循环机制

<PythonEditor title="事件循环机制对比" compare={true}>
```javascript !! js
// JavaScript 事件循环
console.log("1. 开始执行");

setTimeout(() => {
    console.log("4. 定时器回调");
}, 0);

Promise.resolve().then(() => {
    console.log("3. Promise 微任务");
});

console.log("2. 同步代码结束");

// 输出顺序：
// 1. 开始执行
// 2. 同步代码结束
// 3. Promise 微任务
// 4. 定时器回调
```

```python !! py
import asyncio

# Python 事件循环
async def main():
    print("1. 开始执行")
    
    # 创建任务
    task1 = asyncio.create_task(asyncio.sleep(0))
    task2 = asyncio.create_task(asyncio.sleep(0))
    
    print("2. 任务已创建")
    
    # 等待任务完成
    await task1
    print("3. 第一个任务完成")
    
    await task2
    print("4. 第二个任务完成")

# 运行事件循环
# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())

# 输出顺序：
# 1. 开始执行
# 2. 任务已创建
# 3. 第一个任务完成
# 4. 第二个任务完成
```
</PythonEditor>

## 3. Python 异步编程基础

### 3.1 async/await 语法

Python 的 async/await 语法与 JavaScript 非常相似，但有一些重要的差异。

<PythonEditor title="async/await 语法对比" compare={true}>
```javascript !! js
// JavaScript async/await
async function fetchUserData(userId) {
    try {
        console.log(`开始获取用户 ${userId} 的数据`);
        
        // 模拟 API 调用
        const response = await new Promise((resolve, reject) => {
            setTimeout(() => {
                if (userId > 0) {
                    resolve({
                        id: userId,
                        name: `用户${userId}`,
                        email: `user${userId}@example.com`
                    });
                } else {
                    reject(new Error("用户不存在"));
                }
            }, 1000);
        });
        
        console.log("数据获取成功");
        return response;
        
    } catch (error) {
        console.error("获取数据失败:", error.message);
        throw error;
    }
}

async function processUsers() {
    console.log("开始处理用户数据");
    
    try {
        const user1 = await fetchUserData(1);
        const user2 = await fetchUserData(2);
        
        console.log("用户1:", user1);
        console.log("用户2:", user2);
        
        return [user1, user2];
        
    } catch (error) {
        console.error("处理用户数据失败:", error.message);
    }
}

// 调用异步函数
processUsers().then(users => {
    console.log("所有用户:", users);
});
```

```python !! py
import asyncio
import random

# Python async/await
async def fetch_user_data(user_id):
    """获取用户数据"""
    try:
        print(f"开始获取用户 {user_id} 的数据")
        
        # 模拟 API 调用
        await asyncio.sleep(1)  # 模拟网络延迟
        
        if user_id > 0:
            data = {
                "id": user_id,
                "name": f"用户{user_id}",
                "email": f"user{user_id}@example.com"
            }
            print("数据获取成功")
            return data
        else:
            raise ValueError("用户不存在")
            
    except Exception as error:
        print(f"获取数据失败: {error}")
        raise error

async def process_users():
    """处理用户数据"""
    print("开始处理用户数据")
    
    try:
        # 顺序执行
        user1 = await fetch_user_data(1)
        user2 = await fetch_user_data(2)
        
        print("用户1:", user1)
        print("用户2:", user2)
        
        return [user1, user2]
        
    except Exception as error:
        print(f"处理用户数据失败: {error}")

# 运行异步函数
async def main():
    users = await process_users()
    print("所有用户:", users)

# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

### 3.2 并发执行

Python 提供了多种方式来实现并发执行，类似于 JavaScript 的 Promise.all()。

<PythonEditor title="并发执行对比" compare={true}>
```javascript !! js
// JavaScript 并发执行
async function fetchUserData(userId) {
    console.log(`开始获取用户 ${userId} 的数据`);
    // 模拟随机延迟
    const delay = 500 + Math.random() * 1000;
    await new Promise(resolve => setTimeout(resolve, delay));
    console.log(`完成获取用户 ${userId} 的数据`);
    return {
        id: userId,
        name: `用户${userId}`,
        email: `user${userId}@example.com`
    };
}

async function processUsersConcurrently() {
    console.log("开始并发获取用户数据 (Promise.all)");
    const promises = [
        fetchUserData(1),
        fetchUserData(2),
        fetchUserData(3)
    ];
    const users = await Promise.all(promises);
    console.log("所有用户数据:", users);
    return users;
}

// 使用 Promise.race() 获取最快的结果
async function getFastestUser() {
    console.log("\n开始竞速获取用户数据 (Promise.race)");
    const promises = [
        fetchUserData(1),
        fetchUserData(2),
        fetchUserData(3)
    ];
    const fastestUser = await Promise.race(promises);
    console.log("最快的用户:", fastestUser);
    return fastestUser;
}

// 调用函数
processUsersConcurrently();
getFastestUser();
```

```python !! py
import asyncio
import random

# Python 并发执行
async def fetch_user_data(user_id):
    """获取用户数据"""
    print(f"开始获取用户 {user_id} 的数据")
    # 模拟随机网络延迟
    delay = 0.5 + random.random()
    await asyncio.sleep(delay)
    print(f"完成获取用户 {user_id} 的数据")
    return {
        "id": user_id,
        "name": f"用户{user_id}",
        "email": f"user{user_id}@example.com"
    }

async def process_users_concurrently():
    """并发处理用户数据 (类似 Promise.all)"""
    print("开始并发获取用户数据 (asyncio.gather)")
    tasks = [
        fetch_user_data(1),
        fetch_user_data(2),
        fetch_user_data(3)
    ]
    users = await asyncio.gather(*tasks)
    print("所有用户数据:", users)
    return users

async def get_fastest_user():
    """获取最快的用户数据 (类似 Promise.race)"""
    print("\n开始竞速获取用户数据 (asyncio.wait)")
    tasks = [
        asyncio.create_task(fetch_user_data(1)),
        asyncio.create_task(fetch_user_data(2)),
        asyncio.create_task(fetch_user_data(3))
    ]
    
    # 等待第一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    # 获取已完成任务的结果
    fastest_user = done.pop().result()
    
    # 取消其他未完成的任务
    for task in pending:
        task.cancel()
        
    print(f"最快的用户: {fastest_user}")
    return fastest_user

# 运行并发函数
async def main():
    await process_users_concurrently()
    await get_fastest_user()

# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

### 3.3 异步上下文管理器

Python 的异步上下文管理器类似于 JavaScript 的 try-with-resources 模式。

<PythonEditor title="异步上下文管理器" compare={true}>
```javascript !! js
// JavaScript 资源管理
class AsyncResource {
    constructor(name) {
        this.name = name;
        this.isOpen = false;
    }
    
    async open() {
        console.log(`打开资源: ${this.name}`);
        this.isOpen = true;
    }
    
    async close() {
        console.log(`关闭资源: ${this.name}`);
        this.isOpen = false;
    }
    
    async use() {
        if (!this.isOpen) {
            throw new Error("资源未打开");
        }
        console.log(`使用资源: ${this.name}`);
        await new Promise(resolve => setTimeout(resolve, 1000));
    }
}

// 手动资源管理
async function useResource() {
    const resource = new AsyncResource("数据库连接");
    
    try {
        await resource.open();
        await resource.use();
    } finally {
        await resource.close();
    }
}

// 使用资源
useResource();
```

```python !! py
import asyncio
from contextlib import asynccontextmanager

# Python 异步上下文管理器
class AsyncResource:
    """异步资源类"""
    
    def __init__(self, name):
        self.name = name
        self.is_open = False
    
    async def __aenter__(self):
        """进入上下文"""
        print(f"打开资源: {self.name}")
        self.is_open = True
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出上下文"""
        print(f"关闭资源: {self.name}")
        self.is_open = False
    
    async def use(self):
        """使用资源"""
        if not self.is_open:
            raise RuntimeError("资源未打开")
        print(f"使用资源: {self.name}")
        await asyncio.sleep(1)

# 使用异步上下文管理器
async def use_resource():
    """使用异步资源"""
    async with AsyncResource("数据库连接") as resource:
        await resource.use()

# 使用装饰器创建异步上下文管理器
@asynccontextmanager
async def managed_resource(name):
    """管理的资源上下文管理器"""
    print(f"打开资源: {name}")
    try:
        yield name
    finally:
        print(f"关闭资源: {name}")

async def use_managed_resource():
    """使用管理的资源"""
    async with managed_resource("文件连接") as resource:
        print(f"使用资源: {resource}")
        await asyncio.sleep(1)

# 运行示例
async def main():
    await use_resource()
    await use_managed_resource()

# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

## 4. 异步 Web 开发

### 4.1 FastAPI 基础

FastAPI 是 Python 的现代异步 Web 框架，类似于 JavaScript 的 Express.js。

<PythonEditor title="FastAPI vs Express.js 对比" compare={true}>
```javascript !! js
// Express.js 服务器
const express = require('express');
const app = express();
const port = 3000;

// 中间件
app.use(express.json());

// 路由
app.get('/', (req, res) => {
    res.json({ message: 'Hello World!' });
});

app.get('/users/:id', async (req, res) => {
    try {
        const userId = parseInt(req.params.id);
        const user = await fetchUserData(userId);
        res.json(user);
    } catch (error) {
        res.status(404).json({ error: error.message });
    }
});

app.post('/users', async (req, res) => {
    try {
        const userData = req.body;
        const newUser = await createUser(userData);
        res.status(201).json(newUser);
    } catch (error) {
        res.status(400).json({ error: error.message });
    }
});

// 启动服务器
app.listen(port, () => {
    console.log(`服务器运行在 http://localhost:${port}`);
});

// 模拟异步函数
async function fetchUserData(userId) {
    await new Promise(resolve => setTimeout(resolve, 100));
    return { id: userId, name: `用户${userId}` };
}

async function createUser(userData) {
    await new Promise(resolve => setTimeout(resolve, 100));
    return { id: Date.now(), ...userData };
}
```

```python !! py
# FastAPI 服务器
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uvicorn

app = FastAPI(title="用户 API", version="1.0.0")

# 数据模型
class User(BaseModel):
    name: str
    email: str
    age: Optional[int] = None

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    age: Optional[int] = None

# 路由
@app.get("/")
async def root():
    """根路径"""
    return {"message": "Hello World!"}

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """获取用户"""
    try:
        user = await fetch_user_data(user_id)
        return user
    except ValueError as error:
        raise HTTPException(status_code=404, detail=str(error))

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: User):
    """创建用户"""
    try:
        new_user = await create_user_data(user)
        return new_user
    except ValueError as error:
        raise HTTPException(status_code=400, detail=str(error))

# 模拟异步函数
async def fetch_user_data(user_id: int) -> UserResponse:
    """获取用户数据"""
    await asyncio.sleep(0.1)  # 模拟数据库查询
    if user_id <= 0:
        raise ValueError("用户不存在")
    return UserResponse(
        id=user_id,
        name=f"用户{user_id}",
        email=f"user{user_id}@example.com",
        age=25
    )

async def create_user_data(user: User) -> UserResponse:
    """创建用户数据"""
    await asyncio.sleep(0.1)  # 模拟数据库插入
    return UserResponse(
        id=1,  # 实际应用中应该是数据库生成的 ID
        name=user.name,
        email=user.email,
        age=user.age
    )

# 启动服务器
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
```
</PythonEditor>

### 4.2 异步数据库操作

<PythonEditor title="异步数据库操作" compare={true}>
```javascript !! js
// JavaScript 异步数据库操作
const { Pool } = require('pg');

class DatabaseService {
    constructor() {
        this.pool = new Pool({
            host: 'localhost',
            database: 'testdb',
            user: 'username',
            password: 'password',
            port: 5432,
        });
    }
    
    async getUsers() {
        const client = await this.pool.connect();
        try {
            const result = await client.query('SELECT * FROM users');
            return result.rows;
        } finally {
            client.release();
        }
    }
    
    async getUserById(id) {
        const client = await this.pool.connect();
        try {
            const result = await client.query(
                'SELECT * FROM users WHERE id = $1',
                [id]
            );
            return result.rows[0];
        } finally {
            client.release();
        }
    }
    
    async createUser(userData) {
        const client = await this.pool.connect();
        try {
            const result = await client.query(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *',
                [userData.name, userData.email]
            );
            return result.rows[0];
        } finally {
            client.release();
        }
    }
}

// 使用数据库服务
const dbService = new DatabaseService();

async function handleGetUsers(req, res) {
    try {
        const users = await dbService.getUsers();
        res.json(users);
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
}
```

```python !! py
# Python 异步数据库操作
import asyncpg
from typing import List, Optional
from pydantic import BaseModel

class User(BaseModel):
    id: Optional[int] = None
    name: str
    email: str

class DatabaseService:
    """数据库服务类"""
    
    def __init__(self):
        self.pool = None
    
    async def connect(self):
        """连接数据库"""
        self.pool = await asyncpg.create_pool(
            host='localhost',
            database='testdb',
            user='username',
            password='password',
            port=5432
        )
    
    async def close(self):
        """关闭数据库连接"""
        if self.pool:
            await self.pool.close()
    
    async def get_users(self) -> List[User]:
        """获取所有用户"""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch('SELECT * FROM users')
            return [User(**dict(row)) for row in rows]
    
    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """根据 ID 获取用户"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT * FROM users WHERE id = $1',
                user_id
            )
            return User(**dict(row)) if row else None
    
    async def create_user(self, user: User) -> User:
        """创建用户"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *',
                user.name, user.email
            )
            return User(**dict(row))

# 使用数据库服务
db_service = DatabaseService()

async def handle_get_users():
    """处理获取用户请求"""
    try:
        users = await db_service.get_users()
        return users
    except Exception as error:
        raise HTTPException(status_code=500, detail=str(error))

# 在 FastAPI 中使用
@app.get("/users", response_model=List[User])
async def get_users():
    """获取所有用户"""
    return await handle_get_users()

@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
    """获取指定用户"""
    user = await db_service.get_user_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user

@app.post("/users", response_model=User, status_code=201)
async def create_user(user: User):
    """创建用户"""
    return await db_service.create_user(user)
```
</PythonEditor>

## 5. 异步迭代和生成器

### 5.1 异步迭代器

<PythonEditor title="异步迭代器对比" compare={true}>
```javascript !! js
// JavaScript 异步迭代器
class AsyncDataStream {
    constructor(data) {
        this.data = data;
        this.index = 0;
    }
    
    async *[Symbol.asyncIterator]() {
        for (const item of this.data) {
            // 模拟异步处理
            await new Promise(resolve => setTimeout(resolve, 100));
            yield item;
        }
    }
}

// 使用异步迭代器
async function processDataStream() {
    const stream = new AsyncDataStream([1, 2, 3, 4, 5]);
    
    for await (const item of stream) {
        console.log(`处理数据: ${item}`);
    }
}

// 异步生成器函数
async function* asyncGenerator() {
    for (let i = 0; i < 5; i++) {
        await new Promise(resolve => setTimeout(resolve, 100));
        yield i;
    }
}

async function useAsyncGenerator() {
    for await (const value of asyncGenerator()) {
        console.log(`生成的值: ${value}`);
    }
}

// 调用函数
processDataStream();
useAsyncGenerator();
```

```python !! py
import asyncio

# Python 异步迭代器
class AsyncDataStream:
    """异步数据流"""
    
    def __init__(self, data):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        # 模拟异步处理
        await asyncio.sleep(0.1)
        item = self.data[self.index]
        self.index += 1
        return item

# 使用异步迭代器
async def process_data_stream():
    """处理异步数据流"""
    stream = AsyncDataStream([1, 2, 3, 4, 5])
    
    async for item in stream:
        print(f"处理数据: {item}")

# 异步生成器函数
async def async_generator():
    """异步生成器"""
    for i in range(5):
        await asyncio.sleep(0.1)
        yield i

async def use_async_generator():
    """使用异步生成器"""
    async for value in async_generator():
        print(f"生成的值: {value}")

# 运行示例
async def main():
    await process_data_stream()
    await use_async_generator()

# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

### 5.2 异步上下文管理器与迭代器结合

<PythonEditor title="异步上下文管理器与迭代器" compare={true}>
```javascript !! js
// JavaScript 异步资源迭代器
class AsyncFileReader {
    constructor(filename) {
        this.filename = filename;
        this.fileHandle = null;
    }
    
    async open() {
        this.fileHandle = await open(this.filename, 'r');
    }
    
    async close() {
        if (this.fileHandle) {
            await this.fileHandle.close();
        }
    }
    
    async *readLines() {
        if (!this.fileHandle) {
            throw new Error('文件未打开');
        }
        
        for await (const line of this.fileHandle.readLines()) {
            yield line.trim();
        }
    }
}

// 使用异步文件读取器
async function processFile(filename) {
    const reader = new AsyncFileReader(filename);
    
    try {
        await reader.open();
        
        for await (const line of reader.readLines()) {
            if (line) {
                console.log(`处理行: ${line}`);
            }
        }
    } finally {
        await reader.close();
    }
}
```

```python !! py
import asyncio
import aiofiles

# Python 异步文件读取器
class AsyncFileReader:
    """异步文件读取器"""
    
    def __init__(self, filename):
        self.filename = filename
        self.file = None
    
    async def __aenter__(self):
        """进入上下文"""
        self.file = await aiofiles.open(self.filename, mode='r')
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出上下文"""
        if self.file:
            await self.file.close()
    
    async def read_lines(self):
        """读取文件行"""
        async for line in self.file:
            yield line.strip()

# 使用异步文件读取器
async def process_file(filename):
    """处理文件"""
    async with AsyncFileReader(filename) as reader:
        async for line in reader.read_lines():
            if line:
                print(f"处理行: {line}")

# 创建示例文件
async def create_sample_file():
    """创建示例文件"""
    async with aiofiles.open('sample.txt', mode='w') as f:
        await f.write("第一行\n")
        await f.write("第二行\n")
        await f.write("第三行\n")

# 运行示例
async def main():
    await create_sample_file()
    await process_file('sample.txt')

# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

## 6. 实际项目示例

### 6.1 异步 Web 爬虫

<PythonEditor title="异步 Web 爬虫示例" compare={true}>
```javascript !! js
// JavaScript 异步爬虫
const axios = require('axios');
const cheerio = require('cheerio');

class AsyncWebCrawler {
    constructor() {
        this.visited = new Set();
        this.results = [];
    }
    
    async crawlPage(url) {
        if (this.visited.has(url)) {
            return;
        }
        
        this.visited.add(url);
        console.log(`爬取页面: ${url}`);
        
        try {
            const response = await axios.get(url);
            const $ = cheerio.load(response.data);
            
            // 提取标题
            const title = $('title').text();
            const links = $('a').map((i, el) => $(el).attr('href')).get();
            
            this.results.push({
                url,
                title,
                links: links.slice(0, 5) // 限制链接数量
            });
            
            console.log(`页面标题: ${title}`);
            
        } catch (error) {
            console.error(`爬取失败: ${url}`, error.message);
        }
    }
    
    async crawlMultiplePages(urls) {
        const promises = urls.map(url => this.crawlPage(url));
        await Promise.all(promises);
        
        return this.results;
    }
}

// 使用爬虫
async function main() {
    const crawler = new AsyncWebCrawler();
    const urls = [
        'https://example.com',
        'https://httpbin.org/get',
        'https://jsonplaceholder.typicode.com/posts/1'
    ];
    
    const results = await crawler.crawlMultiplePages(urls);
    console.log('爬取结果:', results);
}

// main();
```

```python !! py
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from typing import List, Dict, Set
import time

class AsyncWebCrawler:
    """异步 Web 爬虫"""
    
    def __init__(self):
        self.visited: Set[str] = set()
        self.results: List[Dict] = []
    
    async def crawl_page(self, session: aiohttp.ClientSession, url: str):
        """爬取单个页面"""
        if url in self.visited:
            return
        
        self.visited.add(url)
        print(f"爬取页面: {url}")
        
        try:
            async with session.get(url) as response:
                if response.status == 200:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    
                    # 提取标题
                    title = soup.title.string if soup.title else "无标题"
                    links = [a.get('href') for a in soup.find_all('a', href=True)]
                    
                    self.results.append({
                        'url': url,
                        'title': title,
                        'links': links[:5]  # 限制链接数量
                    })
                    
                    print(f"页面标题: {title}")
                else:
                    print(f"HTTP 错误: {response.status}")
                    
        except Exception as error:
            print(f"爬取失败: {url} - {error}")
    
    async def crawl_multiple_pages(self, urls: List[str]):
        """并发爬取多个页面"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.crawl_page(session, url) 
                for url in urls
            ]
            await asyncio.gather(*tasks)
        
        return self.results

# 使用爬虫
async def main():
    crawler = AsyncWebCrawler()
    urls = [
        'https://httpbin.org/get',
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://api.github.com/users/octocat'
    ]
    
    start_time = time.time()
    results = await crawler.crawl_multiple_pages(urls)
    end_time = time.time()
    
    print(f"爬取完成，耗时: {end_time - start_time:.2f} 秒")
    print("爬取结果:", results)

# 运行爬虫
# 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

### 6.2 异步任务队列

<PythonEditor title="异步任务队列示例" compare={true}>
```javascript !! js
// JavaScript 异步任务队列
class AsyncTaskQueue {
    constructor(concurrency = 3) {
        this.concurrency = concurrency;
        this.running = 0;
        this.queue = [];
    }
    
    async add(task) {
        return new Promise((resolve, reject) => {
            this.queue.push({
                task,
                resolve,
                reject
            });
            this.process();
        });
    }
    
    async process() {
        if (this.running >= this.concurrency || this.queue.length === 0) {
            return;
        }
        
        this.running++;
        const { task, resolve, reject } = this.queue.shift();
        
        try {
            const result = await task();
            resolve(result);
        } catch (error) {
            reject(error);
        } finally {
            this.running--;
            this.process();
        }
    }
    
    async waitForAll() {
        while (this.queue.length > 0 || this.running > 0) {
            await new Promise(resolve => setTimeout(resolve, 100));
        }
    }
}

// 使用任务队列
async function processTask(id) {
    console.log(`开始处理任务 ${id}`);
    await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 2000));
    console.log(`完成处理任务 ${id}`);
    return `任务 ${id} 的结果`;
}

async function main() {
    const queue = new AsyncTaskQueue(2);
    
    // 添加任务
    const promises = [];
    for (let i = 1; i <= 5; i++) {
        promises.push(queue.add(() => processTask(i)));
    }
    
    // 等待所有任务完成
    await queue.waitForAll();
    const results = await Promise.all(promises);
    
    console.log('所有任务完成:', results);
}

// main();
```

```python !! py
import asyncio
import random
from typing import List, Callable, Any
from dataclasses import dataclass

@dataclass
class Task:
    """任务类"""
    func: Callable
    args: tuple = ()
    kwargs: dict = None
    future: asyncio.Future = None
    
    def __post_init__(self):
        if self.kwargs is None:
            self.kwargs = {}
        if self.future is None:
            self.future = asyncio.Future()

class AsyncTaskQueue:
    """异步任务队列"""
    
    def __init__(self, concurrency: int = 3):
        self.concurrency = concurrency
        self.running = 0
        self.queue: List[Task] = []
        self.semaphore = asyncio.Semaphore(concurrency)
    
    async def add(self, func: Callable, *args, **kwargs) -> asyncio.Future:
        """添加任务到队列"""
        task = Task(func, args, kwargs)
        self.queue.append(task)
        asyncio.create_task(self._process_queue())
        return task.future
    
    async def _process_queue(self):
        """处理队列中的任务"""
        if not self.queue or self.running >= self.concurrency:
            return
        
        self.running += 1
        task = self.queue.pop(0)
        
        try:
            async with self.semaphore:
                result = await task.func(*task.args, **task.kwargs)
                task.future.set_result(result)
        except Exception as error:
            task.future.set_exception(error)
        finally:
            self.running -= 1
            if self.queue:
                asyncio.create_task(self._process_queue())
    
    async def wait_for_all(self):
        """等待所有任务完成"""
        while self.queue or self.running > 0:
            await asyncio.sleep(0.1)

# 使用任务队列
async def process_task(task_id: int) -> str:
    """处理任务"""
    print(f"开始处理任务 {task_id}")
    await asyncio.sleep(1 + random.random() * 2)
    print(f"完成处理任务 {task_id}")
    return f"任务 {task_id} 的结果"

async def main():
    queue = AsyncTaskQueue(2)
    
    # 添加任务
    futures = []
    for i in range(1, 6):
        future = await queue.add(process_task, i)
        futures.append(future)
    
    # 等待所有任务完成
    await queue.wait_for_all()
    results = await asyncio.gather(*futures)
    
    print("所有任务完成:", results)

asyncio.create_task(main())
```
</PythonEditor>

## 7. 练习题

### 练习 1：异步数据处理

<PythonEditor title="练习题 1：异步数据处理" compare={true}>
```javascript !! js
// 异步数据处理练习
async function processData(data) {
    // 模拟数据处理
    await new Promise(resolve => setTimeout(resolve, 100));
    return data.map(item => item * 2);
}

async function filterData(data) {
    // 模拟数据过滤
    await new Promise(resolve => setTimeout(resolve, 50));
    return data.filter(item => item > 10);
}

async function aggregateData(data) {
    // 模拟数据聚合
    await new Promise(resolve => setTimeout(resolve, 75));
    return data.reduce((sum, item) => sum + item, 0);
}

async function processDataPipeline() {
    const rawData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    console.log("原始数据:", rawData);
    
    // 顺序处理
    const processed = await processData(rawData);
    console.log("处理后数据:", processed);
    
    const filtered = await filterData(processed);
    console.log("过滤后数据:", filtered);
    
    const result = await aggregateData(filtered);
    console.log("最终结果:", result);
    
    return result;
}

// 并发处理版本
async function processDataConcurrently() {
    const rawData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    console.log("原始数据:", rawData);
    
    // 并发处理
    const [processed, filtered, aggregated] = await Promise.all([
        processData(rawData),
        filterData(rawData),
        aggregateData(rawData)
    ]);
    
    console.log("并发处理结果:", { processed, filtered, aggregated });
    return { processed, filtered, aggregated };
}

// 运行练习
processDataPipeline();
processDataConcurrently();
```

```python !! py
import asyncio
from typing import List

# 异步数据处理练习
async def process_data(data: List[int]) -> List[int]:
    """处理数据"""
    await asyncio.sleep(0.1)  # 模拟处理时间
    return [item * 2 for item in data]

async def filter_data(data: List[int]) -> List[int]:
    """过滤数据"""
    await asyncio.sleep(0.05)  # 模拟过滤时间
    return [item for item in data if item > 10]

async def aggregate_data(data: List[int]) -> int:
    """聚合数据"""
    await asyncio.sleep(0.075)  # 模拟聚合时间
    return sum(data)

async def process_data_pipeline():
    """数据处理管道（顺序处理）"""
    raw_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    print("原始数据:", raw_data)
    
    # 顺序处理
    processed = await process_data(raw_data)
    print("处理后数据:", processed)
    
    filtered = await filter_data(processed)
    print("过滤后数据:", filtered)
    
    result = await aggregate_data(filtered)
    print("最终结果:", result)
    
    return result

async def process_data_concurrently():
    """并发数据处理"""
    raw_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    print("原始数据:", raw_data)
    
    # 并发处理
    processed, filtered, aggregated = await asyncio.gather(
        process_data(raw_data),
        filter_data(raw_data),
        aggregate_data(raw_data)
    )
    
    print("并发处理结果:", {
        "processed": processed,
        "filtered": filtered,
        "aggregated": aggregated
    })
    
    return {
        "processed": processed,
        "filtered": filtered,
        "aggregated": aggregated
    }

# 运行练习
async def main():
    print("=== 顺序处理 ===")
    await process_data_pipeline()
    
    print("\n=== 并发处理 ===")
    await process_data_concurrently()

asyncio.create_task(main())
```
</PythonEditor>

### 练习 2：异步 API 客户端

<PythonEditor title="练习题 2：异步 API 客户端" compare={true}>
```javascript !! js
// 异步 API 客户端练习
class AsyncAPIClient {
    constructor(baseURL) {
        this.baseURL = baseURL;
    }
    
    async get(endpoint) {
        const response = await fetch(`${this.baseURL}${endpoint}`);
        if (!response.ok) {
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        return response.json();
    }
    
    async post(endpoint, data) {
        const response = await fetch(`${this.baseURL}${endpoint}`, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
            body: JSON.stringify(data)
        });
        if (!response.ok) {
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        return response.json();
    }
    
    async batchGet(endpoints) {
        const promises = endpoints.map(endpoint => this.get(endpoint));
        return Promise.all(promises);
    }
}

// 使用 API 客户端
async function testAPIClient() {
    const client = new AsyncAPIClient('https://jsonplaceholder.typicode.com');
    
    try {
        // 单个请求
        const user = await client.get('/users/1');
        console.log('用户信息:', user);
        
        // 批量请求
        const [posts, comments, albums] = await client.batchGet([
            '/posts/1',
            '/comments/1',
            '/albums/1'
        ]);
        
        console.log('批量请求结果:', { posts, comments, albums });
        
        // POST 请求
        const newPost = await client.post('/posts', {
            title: '测试文章',
            body: '这是测试内容',
            userId: 1
        });
        
        console.log('新文章:', newPost);
        
    } catch (error) {
        console.error('API 请求失败:', error.message);
    }
}

// testAPIClient();
```

```python !! py
import aiohttp
import asyncio
from typing import List, Dict, Any

class AsyncAPIClient:
    """异步 API 客户端"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    async def get(self, endpoint: str) -> Dict[str, Any]:
        """GET 请求"""
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}{endpoint}") as response:
                if response.status != 200:
                    raise aiohttp.ClientError(
                        f"HTTP {response.status}: {response.reason}"
                    )
                return await response.json()
    
    async def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """POST 请求"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.base_url}{endpoint}",
                json=data
            ) as response:
                if response.status != 201:
                    raise aiohttp.ClientError(
                        f"HTTP {response.status}: {response.reason}"
                    )
                return await response.json()
    
    async def batch_get(self, endpoints: List[str]) -> List[Dict[str, Any]]:
        """批量 GET 请求"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self._get_with_session(session, endpoint)
                for endpoint in endpoints
            ]
            return await asyncio.gather(*tasks)
    
    async def _get_with_session(self, session: aiohttp.ClientSession, endpoint: str):
        """使用现有会话的 GET 请求"""
        async with session.get(f"{self.base_url}{endpoint}") as response:
            if response.status != 200:
                raise aiohttp.ClientError(
                    f"HTTP {response.status}: {response.reason}"
                )
            return await response.json()

# 使用 API 客户端
async def test_api_client():
    """测试 API 客户端"""
    client = AsyncAPIClient('https://jsonplaceholder.typicode.com')
    
    try:
        # 单个请求
        user = await client.get('/users/1')
        print('用户信息:', user)
        
        # 批量请求
        posts, comments, albums = await client.batch_get([
            '/posts/1',
            '/comments/1',
            '/albums/1'
        ])
        
        print('批量请求结果:', {
            'posts': posts,
            'comments': comments,
            'albums': albums
        })
        
        # POST 请求
        new_post = await client.post('/posts', {
            'title': '测试文章',
            'body': '这是测试内容',
            'userId': 1
        })
        
        print('新文章:', new_post)
        
    except Exception as error:
        print('API 请求失败:', error)

    # 运行测试
    # 在 Pyodide 环境中，直接调用异步函数
asyncio.create_task(main())
```
</PythonEditor>

## 8. 总结与扩展学习路径
Python 的异步编程能力，虽然起步稍晚，但如今已经非常成熟，适用于 I/O 密集型场景、Web 后端、爬虫、实时数据处理等领域。通过本教程，你已经完成了从基础语法到实际项目的全面掌握。

### 学习回顾
理解了 Python 中 async/await 的使用方式及其与 JavaScript 的异同

熟悉了事件循环、并发执行、异步生成器、上下文管理器等高级特性

结合 FastAPI 与 asyncpg 实现了完整的异步 Web 服务与数据库交互

编写了爬虫、任务队列、API 客户端等实用异步项目示例

练习了真实场景下的数据处理流程

### 推荐扩展
以下是下一步深入异步编程的学习建议：

#### 📚 框架与库
FastAPI：深入使用依赖注入、后台任务、中间件等功能

aiohttp：构建异步 HTTP 客户端和服务端

Starlette：FastAPI 的底层异步框架，可实现更低层级的异步 Web 服务

Trio / Curio：另类异步库，提供结构化并发的编程范式

#### 🛠️ 工具与调试
asyncio.TaskGroup (Python 3.11+)：更安全的并发执行模型

aiomonitor / aiodebug：调试异步事件循环

pytest-asyncio：编写异步测试用例

#### 💡 实践项目建议
实现一个 异步微服务架构，通过消息队列连接多个异步服务

构建一个 基于 WebSocket 的实时聊天室

编写一个 异步数据聚合器，整合多个 API 的异步响应

🧠 思考题：相比传统多线程，多进程编程，异步编程有哪些优势和限制？你会在哪些实际项目中优先选择异步方案？

🎉 到这里，你已经完成了 Python 异步编程的全景入门。如果你是来自 JavaScript 的背景，相信现在你对 Python 的异步世界已经不再陌生。继续深入探索，异步将成为你开发效率的重要武器！